package com.vf.cloud.rendering.server.dispatch.server;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.jfinal.kit.JsonKit;
import com.vf.cloud.rendering.common.constant.Cache;
import com.vf.cloud.rendering.common.factory.UEFactory;
import com.vf.cloud.rendering.common.util.GpuUtil;
import com.vf.cloud.rendering.server.dispatch.DispatchServer;
import com.vf.cloud.rendering.server.dispatch.handler.Handler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Server implements Runnable {

	private EventLoopGroup group;
	private ExecutorService executor;
	private Channel channel;

	/**
	 * 0-停止 1-正在启动 2-运行中 3-停止中
	 */
	private int status = 0;

	public void run() {
		executor = Executors.newFixedThreadPool(1);
		try {
			executor.submit(new Callable<String>() {
				@Override
				public String call() throws Exception {
					start();
					return "";
				}
			});
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	private void start() {
		try {
			status = 1;
			log.info(">>>正在启动DispatchServer");
			Handler handler = new Handler();
			Bootstrap bootstrap = new Bootstrap();
			group = new NioEventLoopGroup();
			bootstrap.group(group);
			bootstrap.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
					.option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer<SocketChannel>() {
						@Override
						protected void initChannel(SocketChannel ch) throws Exception {
							ChannelPipeline pipeline = ch.pipeline();
							pipeline.addLast(new IdleStateHandler(0, 30, 0, TimeUnit.SECONDS));
							// 添加一个http的编解码器
							pipeline.addLast(new HttpClientCodec());
							// 添加一个用于支持大数据流的支持
							pipeline.addLast(new ChunkedWriteHandler());
							// 添加一个聚合器，这个聚合器主要是将HttpMessage聚合成FullHttpRequest/Response
							pipeline.addLast(new HttpObjectAggregator(1024 * 64));
							pipeline.addLast(handler);

						}
					});

			URI websocketURI = new URI(
					String.format("ws://%s:%s?mac=%s&secretKey=%s", Cache.dispatch.getIp(),  Cache.dispatch.getPort(),Cache.local.getMac(), Cache.dispatch.getSecretKey()));
			HttpHeaders httpHeaders = new DefaultHttpHeaders();
			// 进行握手
			WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI,
					WebSocketVersion.V13, (String) null, true, httpHeaders);
			channel = bootstrap.connect(websocketURI.getHost(), websocketURI.getPort()).sync().channel();
			handler.setHandshaker(handshaker);
			handshaker.handshake(channel);
			// 阻塞等待是否握手成功
			handler.handshakeFuture().sync();
			sendConfig();
			log.info(">>>DispatchServer启动成功：" + channel);
			status = 2;
			channel.closeFuture().sync();
		} catch (InterruptedException | URISyntaxException e) {
			e.printStackTrace();
			status = 3;
			log.error(">>>DispatchServer启动异常：", e);
			stop();
		} finally {
			group.shutdownGracefully();
			if (executor != null)
				executor.shutdownNow();
			log.info(">>>DispatchServer关闭" + channel);
			status = 0;
		}
	}

	public void stop() {
		status = 3;
		if (channel != null) {
			channel.close();
			channel=null;
		}else {
			status = 0;	
		}
	}

	public void restart() {
		stop();
	}

	public int getStatus() {
		return status;
	}

	private ScheduledExecutorService reconnectScheduledExecutor;

	public void reconnect() {
		reconnectScheduledExecutor = new ScheduledThreadPoolExecutor(1);
		reconnectScheduledExecutor.scheduleWithFixedDelay(new TimerTask() {
			@Override
			public void run() {
				if (Cache.dispatch!=null && DispatchServer.getInstance().verify()) {
					if (status == 0) {
						DispatchServer.getInstance().run();
					}
				}else {
					log.error("调度服务无法正常运行：缺少接入点配置...");
				}
			}
		}, 0, 5, TimeUnit.SECONDS);
	}
	
	
	//推送配置
	private void sendConfig() {
		Map<String, Object> nodeConfig = new HashMap<String, Object>();
		nodeConfig.put("type", "renderingConfig");
		nodeConfig.put("data", Cache.local);
		nodeConfig.put("time", System.currentTimeMillis());
		send(nodeConfig);
		sendGPUs();
	}
	
	//推送GPU
	private void sendGPUs() {
		Map<String, Object> gpusConfig = new HashMap<String, Object>();
		gpusConfig.put("type", "GPUs");
		gpusConfig.put("data", GpuUtil.getGPUs());
		gpusConfig.put("time", System.currentTimeMillis());
		send(gpusConfig);
		sendStreamers();
	}
	
	//推送本地池
	private void sendStreamers() {
		Map<String, Object> gpusConfig = new HashMap<String, Object>();
		gpusConfig.put("type", "Streamers");
		gpusConfig.put("data", UEFactory.getInstance().getStreamers());
		gpusConfig.put("time", System.currentTimeMillis());
		send(gpusConfig);
	}
	
	public void send(Map<String, Object> messag) {
		if (channel != null && channel.isOpen() && channel.isActive()) {
			channel.writeAndFlush(new TextWebSocketFrame(JsonKit.toJson(messag)))
			.addListener(new ChannelFutureListener() {
				@Override
				public void operationComplete(ChannelFuture future) throws Exception {
					if (future.isSuccess()) {
					} else {
					}
				}
			});
		}
	}

	public void destroy() {
		if(reconnectScheduledExecutor!=null) {
			reconnectScheduledExecutor.shutdown();
		}
		stop();
	}

}
